管道是Unix系统上最古老的IPC方法。

FIFO是管道概念的变体,它们之间的差别在于FIFO可以用于任意进程间的通信。

概述

$ ls | wc -l

image-20210130114347946

两个进程并不知道管道的存在,它们只是从标准文件描述符中读取数据和写入数据。

管道的重要特征:

  • 一个管道是一个字节流

    • 使用管道时不存在消息或消息边界的概念
    • 可以从管道中读取任意大小的数据块,不管写入管道的数据块大小是什么
    • 通过管道传递的数据是顺序的,无法随机访问
  • 读空管道会阻塞至数据到来。管道写端关闭后,管道读取进程在读完剩余所有数据后将看到文件结束(即read()返回0)

  • 在管道中数据的传递方向是单向的

  • 可以确保写入不超过PIPE_BUF字节的操作是原子的

  • 管道的容量是有限的

创建和使用管道

#include <unistd.h>
//成功返回0,失败返回-1
int pipe(int filedes[2]);

成功的pipe()调用会在filedes中返回两个打开的文件描述符:

  • filedes[0]:表示管道的读取端
  • filedes[1]:表示管道的写入端

一旦向管道的写入端写入数据之后立即就能从管道的读取端读取数据。

管道上的read()调用会读取的数据量为所请求的字节数与管道中当前存在的字节数两者之间较小的那个。

ioctl(fd, FIONREAD, &cnt)调用返回文件描述符fd所引用的管道或FIFO中未读取的字节数。

image-20210130154430225

image-20210130154440374

让父进程和子进程都能够从一个管道中读取和写入数据这种做法并不常见的一个原因是:如果两个进程同时试图从管道中读取数据,那么就无法确定哪个进程会首先读取成功——两个进程竞争数据了。

管道允许进程间的通信:

  • 管道可用于任意两个(或多个)相关进程之间的通信。相关进程的含义:拥有相同的祖先进程,由该祖先进程负责创建管道。shell在构建管道线时所做的工作。
  • 通过UNIX domain socket传递一个文件描述符使得管道的一个文件描述符传递给另一个非相关进程

为何要关闭未使用管道文件描述符

  1. 确保进程不会耗尽文件描述符的限制
  2. 只有当所有进程中所有引用一个管道的文件描述符被关闭之后才会销毁该管道以及释放该管道占用的资源以供其它进程复用
  3. 读取进程要关闭其持有的未使用的管道的写入描述符:当其它进程关闭了写入描述符之后,如果读取进程没有关闭写入描述符,则内核知道至少存在一个管道的写入描述符打开着,则读取进程即使读取了管道中的所有数据后仍不会看到文件结束,并阻塞在read()调用,理论上该读取进程仍然可以向管道写入数据,即使它已经被读取操作阻塞。
  4. 写入进程要关闭其持有的未使用的管道的读取描述符:当一个进程向一个管道写入数据但没有任何进程拥有该管道的打开着的读取描述符时,内核会向写入进程发送一个SIGPIPE信号,默认情况该信号会杀死一个进程。进程也可以捕获或忽略该信号,这样会导致管道上的write()操作因EPIPI错误(已损坏的管道)而失败。收到SIGPIPE信号或得到EPIPE错误对于标示出管道的状态是有用的。如果写入进程没有关闭管道的读取端而其它进程已经关闭了该管道的读取端,则写入进程会将数据充满整个管道,后续的写入请求会被永远阻塞。

程序示例:在父进程和子进程间使用管道通信

#include <sys/wait.h>
#include <bits/types.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>

//typedef __ssize_t ssize_t
#define BUF_SIZE 10

int main(int argc, char* argv[]){
    int pfd[2];
    char buf[BUF_SIZE];
    ssize_t numRead;

    if(argc != 2 || strcmp(argv[1], "--help") == 0){
        printf("usage: %s string\n", argv[0]);
        exit(1);
    }

    if(pipe(pfd) == -1){
        printf("error create pipe");
        exit(1);
    }

    switch(fork()){
    case -1:
        printf("error fork");exit(1);
    case 0:
        if(close(pfd[1]) == -1){
            printf("error close child's write of pipe");
            exit(1);
        }
        while(1){
            numRead = read(pfd[0], buf, BUF_SIZE);
            if(numRead == -1){
                printf("error read");exit(1);
            }
            if(numRead == 0)
                break;
            if(write(STDOUT_FILENO, buf, numRead) != numRead)
                printf("child -partial/failed write");
        }

        write(STDOUT_FILENO, "\n", 1);
        if(close(pfd[0]) == -1){
            printf("error close child's read of pipe");
            exit(1);
        }
        _exit(0);
    default:
        if(close(pfd[0]) == -1){
            printf("error close parent's read of pipe");
            exit(1);
        }
        if(write(pfd[1], argv[1], strlen(argv[1])) != strlen(argv[1]))
            printf("parent - partial/failed write");

        if(close(pfd[1]) == -1){
            printf("error close parent's write of pipe");
            exit(1);
        }
        wait(NULL);
        exit(0);
    }
}

将管道作为一种进程同步的方法

使用管道同步具备一个优势:它可以同时用来协调一个进程的动作使之与多个其它(相关)进程匹配。

多个(标准)信号无法排队的事实使得信号不适用于这种情形,相反,信号的优势是它可以被一个进程广播到进程组中的所有成员处。

#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define BUF_SIZE 10

char *currTime(const char *format)
{
    static char buf[BUF_SIZE];  /* Nonreentrant */
    time_t t;
    size_t s;
    struct tm *tm;

    t = time(NULL);
    tm = localtime(&t);
    if (tm == NULL)
        return NULL;

    s = strftime(buf, BUF_SIZE, (format != NULL) ? format : "%c", tm);

    return (s == 0) ? NULL : buf;
}

int main(int argc, char* argv[]){
    int pfd[2];
    int j, dummy;

    if(argc < 2 || strcmp(argv[1], "--help") == 0){
        printf("usage: %s sleep-time...\n", argv[0]);
        exit(1);
    }

    setbuf(stdout, NULL);

    printf("%s Parent started\n", currTime("%T"));

    if(pipe(pfd) == -1){
        printf("error pipe");
        exit(1);
    }

    for(j = 1; j < argc; ++j){
        switch(fork()){
            case -1:
                printf("error fork %d", j);
                exit(1);
            case 0:
                if(close(pfd[0]) == -1){
                    printf("error close");
                    exit(1);
                }
                sleep(strtol(argv[j], NULL, 10));

                printf("%s Child %d (PID=%ld) closing pepe\n",
                        currTime("%T"), j, (long)getpid());
                if(close(pfd[1]) == -1){
                    printf("error close");
                    exit(1);
                }
                _exit(0);
            default:
                break;
        }
    }
    if(close(pfd[1]) == -1){
        printf("error close");
        exit(1);
    }

    if(read(pfd[0], &dummy, 1) != 0){
        printf("parent didn't get EOF");
    }
    printf("%s Parent ready to go\n", currTime("%T"));

    exit(0);
}

使用管道连接过滤器

当管道被创建之后,为管道的两端分配的文件描述符是可用描述符中数值最小的两个。

使用管道连接两个过滤器(即从stdin读取和写入到stdout的程序)使得一个程序的标准输出被定向到管道中,而另一个程序的标准输入则从管道中读取。

通常情况下,进程已经使用了描述符0、1、2,为管道分配的文件描述符数值更大。

#include <sys/wait.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char* argv[]){
    int pfd[2];

    if(pipe(pfd) == -1){
        printf("error pipe");
        exit(1);
    }

    switch(fork()){
        case -1:
            printf("error fork");
            exit(1);
        case 0:
            if(close(pfd[0]) == -1){
                printf("error close");
                exit(1);
            }
            if(pfd[1] != STDOUT_FILENO){
                if(dup2(pfd[1], STDOUT_FILENO) == -1){
                    printf("error dup2");
                    exit(1);
                }
                if(close(pfd[1]) == -1){
                    printf("error close");
                    exit(1);
                }
            }
            execlp("ls", "ls", (char*)NULL);
            printf("error execlp ls");
            exit(1);
        default:
            break;
    }
    switch(fork()){
        case -1:
            printf("error fork");
            exit(1);
        case 0:
            if(close(pfd[1]) == -1){
                printf("error close");
                exit(1);
            }
            if(pfd[0] != STDIN_FILENO){
                if(dup2(pfd[0], STDIN_FILENO) == -1){
                    printf("error dup2");
                    exit(1);
                }
                if(close(pfd[0]) == -1){
                    printf("error close");
                    exit(1);
                }
            }
            execlp("wc", "wc", "-l", (char*)NULL);
            printf("error execlp wc");
            exit(1);
        default:
            break;
    }
    if(close(pfd[0]) == -1){
        printf("error close");
        exit(1);
    }
    if(close(pfd[1]) == -1){
        printf("error close");
        exit(1);
    }
    if(wait(NULL) == -1){
        printf("error wait");
        exit(1);
    }
    if(wait(NULL) == -1){
        printf("error wait");
        exit(1);
    }
    
    exit(0);
}

通过管道与shell命令进行通信:popen()

管道的一个常见用途是执行shell命令并读取其输出或向其发送一些输入。popen()和pclose()函数简化了这个任务。

#include <stdio.h>
#include <bits/types/FILE.h>

FILE *popen(const char *command, const char *mode);

int pclose(FILE *stream);

popen()成功时返回可供stdio库函数使用的文件流指针,当发生错误时(如mode不是r或w,创建管道失败,或通过fork()创建子进程失败),popen()会返回NULL并设置errno以标示出发生错误的原因。

popen()函数创建了一个管道,然后创建了一个子进程来执行shell,而shell又创建了一个子进程来执行command字符串。mode参数是一个字符串,它确定调用进程是从管道中读取数据(mode是r)还是将数据写入到管道中(mode是w)。

mode的取值确定了所执行的命令的标准输出是连接到管道的写入端还是将其标准输入连接到管道的读取端。

image-20210131174301562

与使用pipe()创建的管道一样,当从管道中读取数据时,调用进程在command关闭管道的写入端之后会看到文件结束;当向管道写入数据时,如果command已经关闭了管道的读取端,那么调用进程会收到SIGPIPE信号并得到EPIPE错误。

一旦I/O结束之后,可以使用pclose()函数关闭管道并等待子进程中的shell终止。(fclose()函数不等待子进程)。pclose()在成功时会返回子进程中shell的终止状态。如果无法执行shell,那么pclose()会返回一个值就像子进程中的shell通过调用_exit(127)来终止一样。如果发生了其它错误(如无法取得终止状态),pclose()返回-1。

未读:system()和popen()以及pclose()的异同**

程序示例:使用popen()通配文件描述符

#include <ctype.h>
#include <limits.h>
#include <bits/types/FILE.h>
#include <stdbool.h>
#include <limits.h>
#include <string.h>

#include <stdio.h>
#include <stdlib.h>

#define POPEN_FMT "/bin/ls -d %s 2> /dev/null"
#define PAT_SIZE 50
#define PCMD_BUF_SIZE (sizeof(POPEN_FMT) + PAT_SIZE)

int main(int argc, char *argv[])
{
    char pat[PAT_SIZE];
    char popenCmd[PCMD_BUF_SIZE];
    FILE *fp;
    bool badPattern;
    int len, status, fileCnt, j;
    char pathname[PATH_MAX];

    for (;;)
    {
        printf("pattren: ");
        fflush(stdout);
        if (fgets(pat, PAT_SIZE, stdin) == NULL)
            break;
        len = strlen(pat);
        if (len <= 1)
            continue;

        if (pat[len - 1] == '\n')
            pat[len - 1] = '\0';

        for (j = 0, badPattern = false; j < len && !badPattern; ++j)
        {
            if (!isalnum((unsigned char)pat[j]) &&
                strchr("_*?[^-]).", pat[j]) == NULL)
                badPattern = true;
        }
        if (badPattern)
        {
            printf("Bad pattern character: %c\n", pat[j - 1]);
            continue;
        }

        snprintf(popenCmd, PCMD_BUF_SIZE, POPEN_FMT, pat);
        popenCmd[PCMD_BUF_SIZE - 1] = '\0';

        fp = popen(popenCmd, "r");
        if (fp == NULL)
        {
            printf("popen() failed\n");
            continue;
        }

        fileCnt = 0;
        while (fgets(pathname, PATH_MAX, fp) != NULL)
        {
            printf("%s", pathname);
            fileCnt++;
        }

        status = pclose(fp);
        printf("    %d matching file%s\n", fileCnt, (fileCnt != 1) ? "s" : "");
        printf("    pclose() status = %#x\n", (unsigned int)status);
        if (status != -1)
        {
            printf("\t");
            if (WIFEXITED(status))
            {
                printf("child exited, status=%d\n", WEXITSTATUS(status));
            }
            else if (WIFSIGNALED(status))
            {
                printf("child killed by signal %d (%s)",
                       WTERMSIG(status), strsignal(WTERMSIG(status)));
#ifdef WCOREDUMP /* Not in SUSv3, may be absent on some systems */
                if (WCOREDUMP(status))
                    printf(" (core dumped)");
#endif
                printf("\n");
            }
            else if (WIFSTOPPED(status))
            {
                printf("child stopped by signal %d (%s)\n",
                       WSTOPSIG(status), strsignal(WSTOPSIG(status)));

#ifdef WIFCONTINUED /* SUSv3 has this, but older Linux versions and \
                    some other UNIX implementations don't */
            }
            else if (WIFCONTINUED(status))
            {
                printf("child continued\n");
#endif
            }
            else
            { /* Should never happen */
                printf("what happened to this child? (status=%x)\n",
                       (unsigned int)status);
            }
        }
    }
    exit(0);
}

管道和stdio缓冲

由于popen()调用返回的文件流指针没有引用一个终端,因此stdio库会对这种文件流应用块缓冲

当将mode的值设置为w来调用popen()时:默认情况下只有当stdio缓冲器被充满或使用pclose()关闭管道之后输出才会被发送到管道另一端的子进程。如果需要确保子进程能够立即从管道中接收数据,那么就需要定期调用fflush()或使用setbuf(fp, NULL)调用禁用stdio缓冲。

当使用pipe()创建管道,然后使用fdopen()获取一个与管道的写入端对应的stdio流时也可以使用这项技术

当将mode的值设置为r来调用popen()时:如果子进程(写入数据的进程)正在使用stdio库,那么除非调用进程显示调用了fflush()或使用setbuf(fp, NULL)调用禁用stdio缓冲,子进程的输出只有在子进程填满stdio缓冲器或调用了fclose()之后才会对调用进程可用。(使用pipe()创建的管道也适用该规则)

如果无法修改源代码,则可以适用伪终端来替换管道。一个伪终端是一个IPC通道,对进程来讲它就像是一个终端。其结果是stdio库会逐行输出缓冲器中的数据。